Source code for nlp_architect.solutions.set_expansion.set_expand

# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import logging
import sys
from os import path

from argparse import ArgumentParser

from nlp_architect.models.np2vec import NP2vec
from nlp_architect.utils.io import validate_existing_filepath, check_size, load_json_file

logger = logging.getLogger(__name__)


[docs]class SetExpand(object): """ Set expansion module, given a trained np2vec model. """ def __init__(self, np2vec_model_file, binary=False, word_ngrams=False, grouping=False, light_grouping=False, grouping_map_dir=None): """ Load the np2vec model for set expansion. Args: np2vec_model_file (str): the file containing the np2vec model to load binary (bool): boolean indicating whether the np2vec model to load is in binary format word_ngrams (int {1,0}): If 1, np2vec model to load uses word vectors with subword ( ngrams) information. light_grouping (bool): boolean indicating whether to load all maps for grouping. grouping_map_dir (str): path to the directory containing maps for grouping. Returns: np2vec model to load """ self.grouping = grouping if grouping: # load grouping info logger.info('loading grouping data') if not grouping_map_dir: grouping_map_dir = path.dirname(np2vec_model_file) self.np2id = load_json_file(path.join(grouping_map_dir, 'np2id')) if not light_grouping: self.id2rep = load_json_file(path.join(grouping_map_dir, 'id2rep')) self.id2group = load_json_file(path.join(grouping_map_dir, 'id2group')) logger.info('loadind model...') self.np2vec_model = NP2vec.load(np2vec_model_file, binary=binary, word_ngrams=word_ngrams) # extract the first term of the model in order to get the marking character logger.info('compute L2 norm') first_term = next(iter(self.np2vec_model.vocab.keys())) self.mark_char = first_term[-1] # Precompute L2-normalized vectors. self.np2vec_model.init_sims() logger.info('done init')
[docs] def term2id(self, term, suffix=True): """ Given an term, return its id. Args: term(str): term (noun phrase) Returns: its id (if is part of the model) """ if self.grouping: if term not in self.np2id.keys(): return None term = self.np2id[term] id = term.replace(' ', self.mark_char) if suffix: id += self.mark_char if id not in self.np2vec_model.vocab: return None return id
def __id2term(self, id): """ Given the id of a noun phrase, return the noun phrase string. Args: id(str): id Returns: term (noun phrase) """ norm = id.replace(self.mark_char, ' ')[:-1] if self.grouping: if norm in self.id2rep: return self.id2rep[norm] logger.warning("id:#%s#, norm:#%s# is not in id2rep") return "" return norm
[docs] def get_vocab(self): """ Return the vocabulary as the list of terms. Returns: the list of terms. """ vocab = [] for id in self.np2vec_model.vocab: term = self.__id2term(id) if term is not None: vocab.append(term) return vocab
[docs] def in_vocab(self, term): id = self.term2id(term) if id is None: return False return True
[docs] def get_group(self, term): logger.info("get group of: %s", term) group = [] if term in self.np2id: id = self.np2id[term] group = self.id2group[id] return group
[docs] def similarity(self, terms, seed, threshold): similar = [] seed_id = self.get_seed_id(seed) for term in terms: term_id = self.term2id(term) if term_id is not None: if self.seed2term_similarity(seed_id, [term_id]) > threshold: similar.append(term) else: logger.info("term: %s is not in vocab", term) return similar
# pylint: disable-msg=too-many-branches
[docs] def expand(self, seed, topn=500): """ Given a seed of terms, return the expanded set of terms. Args: seed: seed terms topn: maximal number of expanded terms to return Returns: up to topn expanded terms and their probabilities """ seed_ids = list() upper = True lower = True for np in seed: np = np.strip() if not self.grouping and (upper or lower): # case feature is relevant only if we don't have grouping if np[0].islower(): upper = False else: lower = False id = self.term2id(np) if id is not None: seed_ids.append(id) else: logger.warning("The term: '%s' is out-of-vocabulary.", np) if len(seed_ids) > 0: if not self.grouping and (upper or lower): res_id = self.np2vec_model.most_similar(seed_ids, topn=2 * topn) else: res_id = self.np2vec_model.most_similar(seed_ids, topn=topn) res = list() for r in res_id: if len(res) == topn: break # pylint: disable=R0916 if self.grouping or (not lower and not upper) or (upper and r[0][0].isupper()) or \ (lower and r[0][0].islower()): term = self.__id2term(r[0]) if term is not None: res.append((self.__id2term(r[0]), r[1])) ret_val = res else: logger.info("All the seed terms are out-of-vocabulary.") ret_val = None return ret_val
[docs] def get_seed_id(self, seed): seed_ids = list() for np in seed: np = np.strip() id = self.term2id(np) if id is not None: seed_ids.append(id) else: logger.warning("The term: '%s' is out-of-vocabulary.", np) return seed_ids
[docs] def term2term_similarity(self, term_id_1, term_id_2): """ Compute cosine similarity between two term id's. Args: term_id_1: first term id term_id_2: second term id Returns: Similarity between the first and second term id's """ logger.info('calculate similarity for: %s , %s', term_id_1, term_id_2) res = self.np2vec_model.similarity(term_id_1, term_id_2) logger.info("similarity result: %s", str(res)) return res
[docs] def seed2term_similarity(self, seed_id, term_id): """ Compute cosine similarity between a seed terms and a term. Args: seed_id: seed term id's term_id: the term id Returns: Similarity between the seed terms and the term """ logger.info('calculate similarity for: %s , %s', str(seed_id), term_id) res = self.np2vec_model.n_similarity(seed_id, list(term_id)) logger.info("similarity result: %s", str(res)) return res
if __name__ == "__main__": arg_parser = ArgumentParser(__doc__) arg_parser.add_argument( '--np2vec_model_file', help='path to the file with the np2vec model to load.', type=validate_existing_filepath) arg_parser.add_argument( '--binary', help='boolean indicating whether the model to load has been stored in binary format.', action='store_true') arg_parser.add_argument( '--word_ngrams', default=0, type=int, choices=[0, 1], help='If 0, the model to load stores word information. If 1, the model to load stores ' 'subword (ngrams) information; note that subword information is relevant only to ' 'fasttext models.') arg_parser.add_argument( '--topn', default=500, type=int, action=check_size(min_size=1), help='maximal number of expanded terms to return') arg_parser.add_argument( '--grouping', action='store_true', default=False, help='grouping mode') args = arg_parser.parse_args() se = SetExpand(np2vec_model_file=args.np2vec_model_file, binary=args.binary, word_ngrams=args.word_ngrams, grouping=args.grouping) enter_seed_str = 'Enter the seed (comma-separated seed terms):' logger.info(enter_seed_str) for seed_str in sys.stdin: seed_list = seed_str.strip().split(',') exp = se.expand(seed_list, args.topn) logger.info('Expanded results:') logger.info(exp) logger.info(enter_seed_str)